之前,介绍了folly的异步框架,其调度中实际执行任务时由线程池完成。听说Facebook目前已经将内部的future替换成协程coro了,恰巧工作中有相关协程使用的讨论,于是看一下folly中基于C++20协程封装的框架。这里不介绍C++20协程的基本使用方式,想要了解可以看下面两个文档。
基础背景
协程相较于线程,其性能更优,由开发人员自己实现任务切换,而不用操作系统进行切换,避免了操作系统线程切换的开销。同时,协程提供了新的开发范式,协程可以看做一个天然的动态DAG调度框架,当某段处理逻辑计算依赖某个数据时,我们可以通过协程切换,先去获取到对应数据,等拿到对应数据后在切换回原逻辑继续执行。对于在执行过程中才能判断是否要执行的数据,对于静态图来说,其支持较为困难,可能需要在图中增加动态的disable逻辑,而使用协程,其天然支持动态决策,灵活性更高。
例如如下一个简单逻辑:
1 | def Function(): |
对于如上逻辑,其含义是,Function的执行依赖了三个函数FunctionA,FunctionB,FunctionC。其中对于B,C函数来说,其执行依赖于A的结果,对于静态构图,可能的形式为:
1 | FunctionA |
在FunctionB和FunctionC中根据FunctionA的值进行判断,来决定是否执行。
其实现方式较为繁琐,需要将一个节点拆分成为多个节点。
使用协程泽不需要如此繁琐,其实现如上面的伪代码基本一致,可能变成如下形式:
1 | def Function(): |
当我们需要某个数据时,使用协程切换,将执行逻辑切换到对应的数据获取方法上即可,当取回数据后,再回到原函数中继续进行处理。处理协程函数也是放到一个大的线程池中处理。这样,我们将要获取的数据全都直接丢到线程池中,由协程调度来自动寻找其依赖的函数,自动丢到线程池中,这样就只需要一个线程池,一个协程调度,就完美的实现了一个动态dag。
这里还存在一些问题,会在后续讲解中逐步回答:
- 如果一个节点被多个算子依赖,如何避免被重复计算。
- 一个节点依赖多个数据,如果每一次执行到要使用的位置在切换协程获取,那会导致每个字段获取串行执行,如何让依赖尽可能并发执行(这个其实和动态图有一定的冲突,但往往是一个强需求)。
Task使用
folly实现的coro核心是Task类,folly官方文档上有十分详细的介绍,这里只贴出来一个使用样例:
1 |
|
执行结果:
1 | $ ./a.out |
可以看到调用sycn时,其输出严格有序,调用asycn时,其输出就是无序的了。这里说明了一个问题是,调用collectAllRange时,如果task本身是同步方法,则其会被串行调用,如果其本身是异步方法,则调用就会异步执行。
同时可以看到调用异步方法asycn时,在co_await前后执行执行在相同的线程池,虽然我们设置了co_await等待的task在另外的线程池执行。这是因为Task的promise_type的await_transform方法调用了co_viaIfAsync,保证协程始终在指定线程池中执行。当await_suspend返回void或者false时,会立即返回给协程函数的调用者。同时协程处于suspend状态。按照如此逻辑,上面实例代码,在执行async函数内部逻辑时(即async协程第一次被resume,即被co_await时),在调用co_await方法前,逻辑都执行在主线程中,当调用co_await时,直接返回到主流程中开始执行下面的语句了,而async协程被挂起,被co_awiat的协程被分配到线程池中执行,在这些协程执行结束后,重新唤醒async协程,由于co_viaIfAsync方法封装了一层协程,保证被唤醒的async协程依然在原线程池中执行。
这里想要说明的一点是,线程池执行协程函数时,如果被suspend而未拉起其他协程的协程(当await_suspend返回coroutine handle时,会立即执行coroutine handle对应的协程,原协程被挂起,至于执行完成新的协程后的处理逻辑,则由新的协程处理函数决定,可以选择恢复原协程,也可以选择再拉起一个协程,或者什么都不干。相当于使用新的协程上下文替换原协程的上下文,新协程执行逻辑和原协程无关,执行完成也不存在要返回到某个原协程的什么位置的概念),并不会占用线程池,因为被suspend的协程函数,会立即返回到调用处(不是返回到调用co_await的地方,而是协程函数的入口位置),在task中,一般是回到resumeCoroutineWithNewAsyncStackRoot函数中的h.resume();,这样线程就会认为执行完成了该task,会继续从线程池的task任务池中取其他的task。而未完成的协程调用什么时候继续呢,会在线程池调用的某个方法中调用被暂停协程的resume方法时被继续执行。这里也说明一个问题,应该尽可能的避免协程被suspend而不拉起新的协程,因为当出现这种情况时,线程池会需要从task队列中查找新的任务,这势必会造成额外的开销,相当于协程的链式调度切换失效了。不论怎么样,当线程池中执行的都是协程函数时,可以大大减少线程池数量,理论上来说,线程池数量和cpu核数绑定即可。
这里还有一个对协程锁的验证,可以看到,程序最终会卡住,这时因为执行getB()函数时,我们获取了锁,在没有释放的前提下协程被切换到了执行getA()函数,这里再次尝试获取锁,这就造成了死锁。这里说明了,协程锁不能解决由于协程切换造成的死锁问题,使用协程锁,更需要考虑死锁问题,要保证协程切换时锁要被正常释放。
对于这个例子,可以仅仅简单运行一下,当前不必深究,当完整了解coro实现后可以回过来来再看一下。
Task包含了大量的基础类,这里我们进行逐一介绍。
TaskPromiseBase
TaskPromiseBase是Task的promise_type的基类,其决定了返回Task协程函数的实际执行逻辑。因此先介绍其具体实现。
TaskPromiseBase类包含如下成员:
1 | class TaskPromiseBase { |
ExtendedCoroutineHandle
该类是coroutine_handle<void>的拓展版本,其定义如下:
1 | class ExtendedCoroutineHandle { |
其实际存储的是调用co_await的协程的coroutine_handle。用于执行完成当前协程函数后,唤醒原来被切换出去的协程函数。举例来说:
1 | Task<T> func { |
在这个协程函数中,当执行到co_await时,当前协程函数func会被暂停,执行调度到co_await对应的lambda函数中去,但是当对于的lambda执行完成后,如何回到原来的函数呢,这个工作是由Task自己完成的,不需要用户指定,其实现的核心就是这里的continuation_(ExtendedCoroutineHandle类),lambda函数对应的Task中的promise_type会持有func协程的coroutine_handle。当lambda执行完成后,调用coroutine_handle的resume即唤醒func协程。
其中ExtendedCoroutinePromise是扩展的promise,这里其仅仅充当接口(因此是纯虚类),其定义如下:
1 | class ExtendedCoroutinePromise { |
AsyncStackFrame
异步栈帧,其定义如下:
1 | // An async stack frame contains information about a particular |
其中parentFrame代表该异步操作的调用者的栈帧,通过parentFrame将调用栈串连起来,调用链通过一个空指针终止,对于paremtFrame为空指针去情况,要么表示该栈帧是被分离的状态(销毁),要么表示下一帧是阻塞等待异步堆栈完成的线程(栈顶?)。
instructionPointer表示这个栈帧调用者的指令指针。这通常是此异步操作的延续地址,或启动此异步操作的代码的地址。 如果地址未知,则可能为空。该变量的赋值通常使用FOLLY_ASYNC_STACK_RETURN_ADDRESS()方法。
其定义如下:
1 | #define FOLLY_ASYNC_STACK_RETURN_ADDRESS() __builtin_return_address(0) |
其中__builtin_return_address可以看__builtin_return_address。
简单来说__builtin_return_address是编译器内建函数,作用是用于获取当前函数或者调用函数的返回地址,当参数是0时,表示的是当前函数的返回地址,参数为1时表示的是调用该函数的函数返回地址。
stackRoot是指向当前线程栈根的指针(stack root)。通过这里cache该变量,我们就不需要通过读取一个线程纬度的数据来获取该指针了。该指针只对最顶层的栈帧有效,当一个栈帧被入栈或者出栈时,该值需要被进行拷贝到对应的栈帧上。一个例外是最底层的栈帧,如果最底层的栈帧中该值不为空,则表示指向当前被阻塞在等待一个异步线程完成的根上(指向阻塞当前线程的异步线程的根上)。在这种情况下,您可以在AsyncStackRoot中找到有关该线程的堆栈帧的信息,并可以使用它来继续遍历堆栈帧。
AsyncStackRoot
AsyncStackRoot包含如下内容
1 | struct AsyncStackRoot{ |
topFrame指向事件循环或者回调调用中当前正在执行的栈帧。
nextRoot指向当前线程堆栈上下一个事件循环上下文的指针。
stackFramePtr指向在当前线程上注册此 AsyncStackRoot 的函数调用的堆栈帧和返回地址的指针。这通常是负责执行异步回调(通常是事件循环)的堆栈框架。初始化该值的典型方法为FOLLY_ASYNC_STACK_FRAME_POINTER()或者setStackFrameContext()。
returnAddress,通过FOLLY_ASYNC_STACK_RETURN_ADDRESS()或者setStackFrameContext()初始化。其中FOLLY_ASYNC_STACK_RETURN_ADDRESS方法为:
1 |
这同样是编译器内建方法,作用是获取调研函数的返回地址。
线程栈与异步栈
AsyncStackRoot和AsyncStackRoot将普通线程栈和异步栈串连起来,其结构大致如下:
1 | // Current Thread Stack |
AsyncStackFrame和AsyncStackRoot用来串连协程调用的堆栈,使得其与线程调用栈类似。每个协程存在一个AsyncStackFrame,协程之间通过AsyncStackFrame.parentFrame串连起来。每个线程存在一个currentThreadAsyncStackRoot,其存储一个AsyncStackRoot。AsyncStackRoot的topFrame执行当前正在执行的协程栈帧。维护这些调用关系是方便进行debug。
对这些字段的维护设计如下函数:
pushAsyncStackFrameCallerCallee
该函数在一个协程调用另一个协程时执行,构建调用者和被调者的关系,并且维护stackRoot指针(指向线程的AsyncStackRoot)。
其实现如下:
1 | /* |
popAsyncStackFrameCallee
该函数用于调用完成了某个协程后,将协程栈从链表中删除,其逻辑如下:
1 | // calleeFrame表示被调的协程栈 |
ScopedAsyncStackRoot
ScopedAsyncStackRoot不是一个函数,而是一个类,其用来维护线程的AsyncStackRoot。其定义如下:
1 | class ScopedAsyncStackRoot { |
这里的framePointer与returnAddress都是之前提到的编译器函数对其赋值。其构造函数为:
1 | static thread_local AsyncStackRootHolder currentThreadAsyncStackRoot; |
首先初始化一个AsyncStackRoot,之后挺好当前线程的AsyncStackRoot为新建的root。析构函数为:
1 | ScopedAsyncStackRoot::~ScopedAsyncStackRoot() { |
在析构函数中还原会原来线程的AsyncStackRoot。
成员函数activateFrame方法为:
1 | inline void activateAsyncStackFrame( |
设置当前root的栈顶帧。
该类是在一个线程上新起一个协程方法时被调用,folly中目前主要是resumeCoroutineWithNewAsyncStackRoot方法使用,其实现如下:
1 | FOLLY_NOINLINE void resumeCoroutineWithNewAsyncStackRoot( |
该函数的意思是使用一个新的AsyncStackRoot来恢复执行一个协程。并且该协程与当前线程中的协程栈没有关系,因此需要维护一个新的AsyncStackRoot,在该协程调用完成之后,再恢复原来的AsyncStackRoot。
CancellationToken
CancellationToken用于向函数或者操作进行信息传递,用于取消操作。其定义如下:
1 | // A CancellationToken is an object that can be passed into an function or |
其需要配合CancellationSource使用。这里不展开介绍,核心是CancellationSource负责管理cancel逻辑,其存在requestCancellation和getToken两个核心接口。其中requestCancellation用于设置取消逻辑(CancellationToken不能设置取消,只能判断是否被取消),getToken用于生成CancellationToken。所以通过同一个CancellationSource生成的CancellationToken被统一管理,当CancellationSource被设置cancel状态时,所以的CancellationToken都被置为cancel状态。
其中存在merge接口,其输入是多个CancellationToken,并生成一个新的CancellationToken。这里的逻辑是,聚合多个CancellationToken,有应该被置为cancel状态时,新的这个cancel就会被置为cancel状态。其内部实现是新建了一个CancellationSource,将其与参数中的CancellationToken对应的CancellationSource绑定,并设置回调函数。
介绍完了成员变量的类型,我们再来看对应使用该类作为promise_type的协程来说,其执行逻辑。
分配Coroutine state
TaskPromiseBase自定义了分配Coroutine state的函数。
1 | static void* operator new(std::size_t size) { |
其new使用的是__builtin_operator_new。delete使用的是__builtin_operator_delete。这里后面的函数主要作用是进行尾调用优化。具体可以参数尾调用。
懒加载
TaskPromiseBase被默认构造。之后会获取函数返回值,这里不是在TaskPromiseBase中实现,而是在其派生类中实现,这里不做介绍。
当创建完成返回值后,会执行initial_suspend判断,来决定是否可以立即执行协程。其实现为懒加载,即始终不会立即执行。:
1 | suspend_always initial_suspend() noexcept { return {}; } |
co_await时获取awaitable和awaiter
当协程内执行co_await时,会调用await_transform方法,这里实现了一系列的方法:
1 | template <typename Awaitable> |
其核心是第一个函数,即实际调用的是folly::coro::co_withAsyncStack和co_viaIfAsync以及co_withCancellation方法。
对于Task来说,这几个方法都重写了,这里看一下这几个函数的默认方法。
co_withCancellation
其方法核心是将cancel与协程任务绑定,对于Task相关结构来说,其绑定是没有问题的,但是默认情况下不清楚awaitable类型没办法绑定,因此默认的改函数实现是:
1 | FOLLY_DEFINE_CPO(detail::adl::WithCancellationFunction, co_withCancellation) |
即默认情况下直接返回awaitable。
这里,task,TaskWithExecutor有实现该方法(后面会介绍该类),其实现如下:
1 | friend Task co_withCancellation( |
可以看到这里是直接将cancel与协程的promise绑定。
co_viaIfAsync
co_viaIfAsync的作用是保证调用者协程能够始终在指定的executor(线程池)中执行。
其具体实现是对协程再通过框架封装一层框架定义的协程,在框架定义的这一层来实现当前协程被suspend后和在其恢复时依然在原线程池上执行。其实现较为复杂,可以先阅读后面部分,对coro协程有整体了解后再回来看其具体实现。
使用一个简单例子来进行描述:
1 | Task<void> funca() { |
在上面的伪代码中,我们希望funcb在线程池exectutor2中执行,但是希望funca在线程池executor1中执行。在执行funcb时,当其被挂起后,执行funca,由于funca被指定在线程池executor1中执行,当funca执行完成后,恢复funcb的执行时,如果没有co_viaIfAsync的协助,funcb剩下的部分也将直接在executor1中被执行,通过co_viaIfAsync,可以保证funcb均在指定线程池中执行。
下面我们来详细了解其实现逻辑。
1 | FOLLY_DEFINE_CPO(detail::adl::ViaIfAsyncFunction, co_viaIfAsync) |
如果用户实现了自己的co_viaIfAsync方法则优先调用用户自己的方法。之后如果用户实现了awaitable的viaIfAsync方法,则会调用该方法,否则,调用ViaIfAsyncAwaitable。下面主要看ViaIfAsyncAwaitable的实现。
1 | class ViaIfAsyncAwaitable { |
当用户直接co_await folly::coro::co_viaIfAsync(executor_.get_alias(),awaitable)时,实际执行的就变成了co_await ViaIfAsyncAwaitable了(这里假设协程的promise_type没有await_transform方法,这个逻辑一般是直接在await_transform中返回ViaIfAsyncAwaitable)。之后通过ViaIfAsyncAwaitable::co_await方法获取awaiter,该方法只创建一个ViaIfAsyncAwaiter。ViaIfAsyncAwaiter即为这里实际的awaiter。
下面看一下ViaIfAsyncAwaiter的实现:
1 | template <bool IsCallerAsyncStackAware, typename Awaitable> |
在创建ViaIfAsyncAwaiter时会调用CoroutineType::create(std::move(executor))方法和folly::coro::get_awaiter(static_cast<Awaitable&&>(awaitable))方法。其中第一个方法的实现为:
1 | static ViaCoroutine createImpl() { co_return; } |
可以看到在执行第一个方法的时候,调用的是一个空的协程,这里就完成了对原来协程的一层封装,相当于在原协程上又封装了一层协程。该协程对应的ViaCoroutine结构为:
1 | class ViaCoroutinePromiseBase { |
当创建ViaIfAsyncAwaiter时,首先会创建ViaCoroutine::promise_type。之后调用ViaCoroutine::promise_type::get_return_object方法创建ViaCoroutine(这时拿到当前协程的coroutine_handle)。之后挂起。
可以看到创建ViaIfAsyncAwaiter会起一个新的协程,并且suspend在对viaCoroutine_的赋值上。
创建ViaIfAsyncAwaiter的第二个函数执行逻辑为:
1 | template < |
这里是根据Awaitable获取到awaiter,其实现与协程实现一致,根据是否存在co_await函数来决定执行逻辑。
到此,完成了ViaIfAsyncAwaiter的创建于获取。只会执行co_await对awaiter的操作。
首先执行await_ready函数,其实现如下:
1 | decltype(auto) await_ready() noexcept(noexcept(awaiter_.await_ready())) { |
直接根据co_awaiter coro的那个协程(被调协程)来决定是否ready,如果已经ready了,直接执行。
正常情况下ready都是false,此时会调用await_suspend来触发实际执行。其逻辑为:
1 | template <typename Promise> |
这里IsCallerAsyncStackAware是false,可以不考虑该逻辑。参数的continuation为调用者协程的coroutine_handle,即我们需要保证执行位置的协程。将continuation存储到viaCoroutine_,之后执行return awaiter_.await_suspend(viaCoroutine_.getHandle());。将架构封装的这层协程的coroutine_handle作为参数执行被调协程。这时,正常来说会立即执行被调协程,并且在被调协程执行完成之后,会唤醒调用者协程,这里的调用者协程就是架构封装的这一层协程。
这时才会执行完成创建ViaIfAsyncAwaiter时的createImpl函数。在执行完成该函数后析构该协程前,将会执行co_await ViaIfAsyncAwaiter::promise_type::final_suspend,这里将返回ViaIfAsyncAwaiter::promise_type::FinalAwaiter,其定义如下:
1 | struct FinalAwaiter { |
这里await_suspend的参数是架构这层协程的coroutine_handle。
其首先获取promise,设置RequestContext,之后执行scheduleContinuation,其实现如下:
1 | void scheduleContinuation() noexcept { |
可以看到,其实现就是把continuation_的resume添加到指定线程池中执行,这里的continuation_即为调用者协程的coroutine_handle,即我们需要保证执行位置的协程。
至此完成了保证协程在指定线程池上执行的全部逻辑,可以看到,整体实现相当精妙,里面使用了C++的很多特性,值得深入研究。
co_withAsyncStack
co_withAsyncStack与co_viaIfAsync的作用类似,其被用于await_transform()内部,用于在当前协程suspend时将当前协程的调用栈信息暂存起来,在resume时恢复,其实现也于co_viaIfAsync类似,架构封装了一层协程实现,这里不过多介绍,详细信息可以看相关代码。
这里特别注意的是,对于希望自己维护调用栈关系的Awaitables,可以定义tag_invoke函数来自己控制,类似如下代码:
1 | class MyAwaitable { |
对awaiter的处理
获取到awaiter后,会调用其await_ready,await_suspend以及最后的await_resume作为co_await的返回结果。这里都不在TaskPromiseBase的控制范畴,会在后续部分详细介绍。
协程结束
协程结束时,如果协程执行过程中跑出了异常,则会先执行unhandled_exception,这里其定义不在TaskPromiseBase而在TaskPromise,其定义为:
1 | void unhandled_exception() noexcept { |
即设置对应result_为异常。
如果没有异常(有异常也执行),则直接执行co_await TaskPromiseBase::final_suspend()。这里我们看一下其定义:
1 | class FinalAwaiter { |
await_ready()返回false表示为懒加载。因此co_await会执行await_suspend,这里coro是当前协程的coroutine_handle。
其逻辑是首先获取当前协程的promise,这里就是TaskPromise。调用popAsyncStackFrameCallee将当前协程从协程栈中出栈。如果存在异常,则调用continuation_的异常处理函数,并返回coroutine_handle。
当没有异常时,返回continuation_的coroutine_handle。这里的continuation_是调用者协程的coroutine_handle,其会在后续介绍的awaiter的await_suspend中赋值。通过这个逻辑,实现了被调者完成处理后唤醒调用者。
协程结束时需要析构handler,folly的实现是析构交由awaiter来实现。
TaskPromise
Task的promise_type并不是直接使用TaskPromiseBase而是使用的TaskPromise,其在TaskPromiseBase基础上增加一些协程的相关函数。
其定义如下:
1 | template <typename T> |
这里,其主要增加了一个Try<StorageType> result_用来存储协程返回值。实现了异常处理函数unhandled_exception。获取协程返回值get_return_object
1 | template <typename T> |
用户调用co_return是设置协程返回值return_value。这里设置的返回值会是co_await coro的最终返回值,即awaiter的await_resume最后会返回该值。
TaskPromise还有一个TaskPromise<void>的偏例化,其主要实现了void相关的接口,这里不详细介绍。
Task
task是folly coro的核心类,一般对于协程函数返回值都应该是Task。通过task,folly将协程调用链串连起来。其定义如下:
1 | template <typename T> |
根据之前关于promise_type的介绍,这里当co_await Task时,获取到的awaiter是Task::Awaiter。
这里其await_ready始终返回false。一定会执行await_suspend,这里其参数continuation是调用者协程的coroutine_handle,而不是co_await Task里面的这个Task。
在await_suspend,首先设置promise的continuation_为调用者协程的coroutine_handle,配合上面介绍的TaskPromiseBase::FinalAwaiter实现被调协程完成后唤醒调用者协程。这里的if constexpr (detail::promiseHasAsyncFrame_v<Promise>)为true,这里的操作是维护协程的调用栈,将当前协程的调用栈追加到调用链路中。之后返回当前协程的coroutine_handle,这会立即执行当前协程。
当协程执行结束,会调用await_resume获取co_await Task的最终返回值,即co_return expr设置的值。这里实际执行的是Try::value。如果没有设置value,则会抛出异常,但是如果是void值,则不会抛出异常,这是由于Try<void>默认是有值的。因此对于返回Task<void>的协程,可以不执行co_return,对于其他类型的返回一定要执行co_return expr。
对于返回Task<void>的协程来说,一定要特别注意是,虽然可以不执行co_return,但是一定要保证函数是协程,即至少要出现co_await,co_return或者co_yield。由于Task没有将默认构造函数delete,因此如果没有出现这三个关键字,则该函数就不是协程函数,不会按照协程的方式执行(不要问我是怎么知道的…)。例如:
1 | Task<void> a() { |
这就不是一个协程函数。
TaskWithExecutor
Task协程默认运行在调用者线程中,但在对延迟较敏感的服务中,我们需要将不同协程执行在不同线程中,也就是一般说的M:N模式(brpc中bthread也是这种模式,将m个用户态线程映射到n个实际liunx线程中,m远大于n)。为提供该功能,Task提供接口scheduleOn及TaskWithExecutor类。
1 | TaskWithExecutor<T> scheduleOn(Executor::KeepAlive<> executor) && noexcept { |
接口提供一个线程池,设置改task执行在该线程池中。返回TaskWithExecutor,其定义如下:
1 | template <typename T> |
先来看当co_await TaskWithExecutor时返回的awaiter执行逻辑,这里根据之前TaskPromise描述可以推断出这里返回的Awaiter,其执行核心为Awaiter::await_suspend函数,其和Task对应的awaiter核心区别是执行协程的逻辑是在被丢到指定线程池中执行,同时需要维护一下ctx,对于ctx的作用可以参考上一篇介绍future的文档:[RequestContext][https://www.yinkuiwang.cn/2023/01/08/folly%E5%BC%82%E6%AD%A5%E6%A1%86%E6%9E%B6%E4%B8%8EDAG/#RequestContext]。
这里由于被调协程会被丢到线程池中执行,因此调用者协程如果直接在被调协程后被resume则会破坏调用者协程指定执行位置(线程池),因此需要co_viaIfAsync函数(上面有介绍)。
函数Awaiter::await_suspend还有一点需要注意的是,这里返回的是空,表示会挂起调用co_await方法的协程,并返回到调用该协程的地方。加入到线程池的函数逻辑较为简单,只有如下两句:
1 | RequestContextScopeGuard contextScope{std::move(ctx)}; |
这里需要注意的是,调用folly::resumeCoroutineWithNewAsyncStackRoot(coro);时会恢复当前task绑定的协程,如果恢复协程后,协程内部执行co_await返空了,则调用回到folly::resumeCoroutineWithNewAsyncStackRoot(coro)函数中的h.resume()语句,这时体现到线程上,这个函数就执行完成了,不会出现阻塞线程的情况。
这里的还有一个InlineTryAwaitable,似乎只有显示调用startInlineUnsafe时会使用,其await_suspend也是不加到线程池里直接调用,这一般是指调用者和被调者使用的是同一个线程池。
TaskWithExecutor的另一个核心接口是start,其含义是执行当前协程,并返回一个SemiFuture,用户使用SemiFuture来等待调用结束。
其核心是将协程执行状态和一个promise绑定,当协程执行完成后,对promise的SemiFuture赋值。这里的核心点是如何触发协程的执行,其实现方式是再加一层协程,这里就是
1 | template <typename F> |
当调用start时,最终会执行到该方法,其返回InlineTaskDetached定义如下:
1 | struct InlineTaskDetached { |
其实现较为简单,await_transform方法只是对原awaitable增加了一层co_withAsyncStack。最终的协程结束处理(FinalAwaiter)也没干什么,只是维护了一下调用栈并且析构了一下资源。调用该函数返回InlineTaskDetached后会立即调用其start方法。该方法直接将自己持有的协程resume,这时就会执行cb(co_await folly::coro::co_awaitTry(std::move(task)));,从而触发我们实际要等到的协程的执行。而返回给调用者的SemiFuture则给用户做判断是否执行完成,当协程执行完成后,cb函数会完成对promise的setTry,这时调用者获得的SemiFuture就变成完成状态。
等待协程执行结束
folly官方文档介绍等待协程执行结束有两种方式:
- 协程调用
scheduleOn().start() folly::coro::blockingWait(std::move(task).scheduleOn())
第一种方式在TaskWithExecutor中已经介绍过了,这里再来看一下blockingWait的实现。
1 | inline constexpr blocking_wait_fn blocking_wait{}; |
这里执行的主要是第一个operator方法。其中makeRefBlockingWaitTask定义如下:
1 | template < |
这里BlockingWaitTask是一个协程返回值。其定义如下:
1 | template <typename T> |
当调用get方法时,会将自己持有的协程resume。这里实际执行的就是co_await static_cast<Awaitable&&>(awaitable);,也即我们要等到执行结束的协程。之后执行promise.wait()等待协程执行结束。这里需要看一下promise的实现。
1 | class BlockingWaitPromiseBase { |
其中核心在BlockingWaitPromiseBase中,其存在一个folly::fibers::Baton类。该类的实现使用的是futex,可以参考该文档futex。这里不展开介绍(其实是不太会orz)。核心是一个同步原语。folly对其封装了一下,核心是两个接口,一个是post,另一个是wait。其中wait用于等待同步信号,post用于发送信号,在post发送信号前,调用wait的线程会被阻塞,直到另一个线程发送了post信号(有点像条件变量的感觉)。因此这里在将我们等待的协程resum后,就通过wait接口等待协程完成。
这里有一点需要注意,正常来说BlockingWaitPromiseBase持有的协程被恢复后,如果执行完成了,我们等待的协程应该执行结束了才对,为啥还需要使用baton来进行同步呢?这时因为,将BlockingWaitPromiseBase持有的协程resume,该协程不一定(在这里是大概率)表示该协程被执行完成了。当我们resume BlockingWaitPromiseBase持有的协程时,执行co_await static_cast<Awaitable&&>(awaitable);,当我们co_await的协程被指定到特定线程池执行时,执行co_await时调用的await_suspend方法返回就是空(可以看TaskWithExecutor的Awaiter::await_suspend方法),这时会立即回到resume协程的地方,这里就是回到了即回到folly::resumeCoroutineWithNewAsyncStackRoot(coro_)里面的h.resume()这条语句这里,接着执行后面的逻辑。而BlockingWaitPromiseBase持有的协程被挂起,当我们等待的协程执行结束时,会重新唤醒BlockingWaitPromiseBase持有的协程,进行后续处理。因此在这里我们使用promise.wait();等待BlockingWaitPromiseBase被重新唤醒并被执行完成(在执行完成时FinalAwaiter的await_suspend发送post信号告知执行结束),这样才能保证我们等待的协程确定被执行完成。
clollectAll
想DAG中依赖关系一样,一个协程依赖的数据产出可能需要多个协程生成,这时,如果我们按照协程实际依赖的数据,每次都co_await对应的协程,将会导致依赖的协程顺序被触发,串行执行,这在对耗时较为敏感的系统中是不可接受的,我们需要有统一触发多个协程并发执行的接口,这就是这一节要介绍的collectAll接口。其传递的参数是一个Task的list,如果task是异步的,即指定线程池执行,则所有的task会被异步执行,如果task是同步的(没有转换为TaskWithExecutor,则会被同步执行)。
其实现如下:
1 | template < |
可以看到,collectAllRange本身也是一个协程函数,返回值是一个Task。因此我们调用该方法时,拿到task后,还需要co_await task。
具体的每步执行逻辑上面都进行了注解。这里核心需要关注的是detail::Barrier和detail::BarrierTask。
Barrier
Barrier是一个屏障,当所以协程执行完成后,才就绪,其定义如下。
1 | class Barrier { |
其存在三个成员变量。
count_计数,用来记录当前还未执行完成的协程数量。continuation_表示当所有条件就绪后需要执行唤醒的协程。asyncFrame_用于维护协程栈。
这里的Awaiter就是collectAll中最后我们co_await的awaiter。当collectAll co_await时,首先设置了最终要唤醒collectAll函数,并将引用计数减一。当每个协程执行结束时,也会将引用计数减一。
BarrierTask
BarrierTask是collectAll中对每一个task包的一层协程。其定义如下:
1 | class BarrierTask { |
promise_type持有一个barrier的指针,当我们调用BarrierTask的start函数时,传递barrier,将在collectAll中创建的barrier传递到promise_type。调用start函数后,就会恢复当前协程的执行。在执行完成当前协程后,final_suspend返回FinalAwaiter。当co_await该awaiter时,执行await_suspend函数,执行barrier的arrive函数,将barrier计数减一,并根据是否已经减到0了来决定是否唤醒等待的协程。
这里在collectAll中设置barrier为task数量+1中的+1,是collectAll调用co_await detail::UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};使用。调用这个函数,会设置barrier满足条件后唤醒的collectAll,并将当前协程挂起,等所以协程都执行结束后再唤醒该协程。
这里需要注意的是,BarrierTask执行start函数唤醒当前协程时,会执行在collectAll中makeTask函数,即执行
1 | co_await co_viaIfAsync( |
如果这里的semiAwaitable不是异步协程,则会陷入semiAwaitable协程执行对应协程的处理逻辑,执行完成后再返回到makeTask函数。而如果semiAwaitable是异步协程,则执行上述语句后会立即返回到start函数中,继续执行for循环后的其他start方法。
因此,当我们collectAll时,为了让所有协程并发执行,一定要传异步协程,否则所有协程一样会被串行执行,和一个一个co_await没区别(甚至因为套了一层协程更废了)。
promise&future&SharedPromise
在上一节,我们解决了一个协程依赖多个协程产生的数据。但是在实际执行中,往往还有另一个问题,如果应该协程产生的数据被多个协程依赖如何处理。根据前面的学习,我们知道,肯定不能是多个协程同时co_await同一个task(协程正常应该只需要执行一次,并且task上线中,co_await时生成的awaiter都使用了std::exchange(t.coro_, {})方式,不支持多次co_await)。
目前对于该需求的实现,和之前介绍的future方法实现一致,即使用SharedPromise。具体可以参考上一篇文档:future&SharedPromise。
将协程的task绑定到一个SharedPromise上,当某个协程依赖该协程使用数据时,从SharedPromise中获取一个future,co_await该future即可。当协程执行完成后,设置SharedPromise状态为完成状态,这时所有等待的协程都会被唤醒。
这里的关键是对co_await future的实现,要保证在co_await future时协程挂起不阻塞。其实现也较为简单:
1 | template <typename T> |
当co_await future是,返回的awaiter是detail::FutureAwaiter,其实现如下:
1 | template <typename T> |
其实现也较为简单,当co_await future时,如果future已经ready,则立即继续执行就好了,因此await_ready中先判断就绪状态。如果future未就绪,则执行await_suspend时,在future后增加一个callback,其逻辑是在future就绪后唤醒当前协程,同时返回空,挂起当前协程。
例子:
1 |
|
这里借助SharedPromise,我们将协程函数设计成了可重入函数。其中使用的folly::coro::SharedMutexFair是协程的读写锁,后面会进行介绍。其他相关逻辑都有注释介绍,这里就不一一解释了。
核心点
使用协程,并且每个协程都分配一个线程池执行的情况下,执行层面优化点在哪,其实每次的调度都是需要把协程丢到线程池队列去执行,那每一个协程的实际执行需要通过线程池的分发。虽然需要通过线程池来进行分发协程任务,但是线程池在执行协程时基本不会阻塞,这就大大减少了内核调度的开销,在线程池分发协程任务时,可能会由于使用有锁队列造成一定的线程阻塞,但大部分情况来说(协程任务不是特别碎的情况下)这部分开销相对较小,因此使用协程还是会有较大收益。
这时我们再考虑一下folly的future,其实如果我们能够完全按照规范使用future,在任务内部不要死等,而都交于future的调度,其实也不会造成线程阻塞,效率理论上来说和协程应该不会差距很多。因此只要解决当前future实现的异步方法中死等的问题即可,利用coro就很好实现了。所以folly的coro兼容future,只需要在原来任务内部future.get()的地方改成co_await future,并且函数返回值是task即可,这样原来的future和现在的core其实新能应该差距不大。
避免阻塞
协程核心就是避免阻塞造成操作系统对线程的切换开销。因此如何避免阻塞就是协程库需要考虑的核心问题了,folly对原阻塞方法都提供了相应的非阻塞方法,下面我们针对性的进行介绍。
sleep
sleep方法时明显的阻塞调用。folly利用事件驱动框架封装了一个非阻塞版本的sleep。其实现如下:
1 | Task<void> sleep(HighResDuration d, Timekeeper* tk = nullptr); |
folly::coro::Baton在介绍等待协程结束章节已经介绍过了,这里不做赘述。这里的核心是folly::futures::sleep(d, tk).toUnsafeFuture();该方法返回一个future。其实直接将其返回,我们co_await future就可以了,但这里为了返回task,直接将co_await future的实现封装到了函数内部。
folly::futures::sleep(d, tk)方法将超时时间加到全局的事件驱动框架中(EventBase类),该事件驱动框架基于libevent实现,这个没看过,不过可以参考nginx的事件驱动框架nginx时间驱动框架。这里就不展开介绍了,后续如果有时间,可以研究一下。
IO
IO是阻塞的重灾区,对于同步io,是没有办法解决阻塞问题的,只能改成使用异步IO。Facebook开源的thrift rpc支持异步io。可以将异步io返回值设置成future,当io完成后,设置对应的promise即可。这时在业务代码中,我们只需要co_await future即可。
对future支持co_await上面已经介绍过了,这里就不再赘述了。
锁
使用一般的线程锁,当出现锁冲突时,未获取到锁的线程将会被挂起。为了避免由于锁导致的阻塞问题,folly提供了协程锁(协程锁的设计不是为了避免协程切换导致的死锁,而是为了避免协程阻塞)。这里解释上文使用过的协程锁SharedMutexFair。
SharedMutexFair实现时基于自旋锁和原子变量实现的,实际以原子变量状态控制锁信息,自旋锁只在读写原子变量时使用。每次读写完原子变量立即释放锁,避免阻塞。
这里只介绍核心接口及其实现。
1 | class SharedMutexFair : private folly::NonCopyableNonMovable { |
这里有一个核心结构folly::Synchronized<State, folly::SpinLock>。这里不展开启实现细节,我们只需要知道其是同步原语即可,其持有一个state数据,访问其中数据都应该通过auto lock = state_.contextualLock()的lock访问,state里面的数据都可以通过lock利用->操作符直接访问到。调用auto lock = state_.contextualLock()时,不仅获取到了对应存储的数据,同时获取了对应的folly::SpinLock锁,即获取的数据被folly::SpinLock锁保护。这里使用的是folly实现的自旋锁,这里也不展开介绍了,其可以理解为就是linux提供的自旋锁。自旋锁理论上是更废cpu的,但是这里为什么要是有自旋锁呢。这就要考虑自旋锁使用的场景了,自旋锁一般用于预期很快就能获取到锁的场景,这样可以避免像互斥锁一样需要将线程先挂起,再恢复的操作。这正是协程调度时所需要的,由于线程池较少,一般不会有很多锁竞争,即使有锁竞争也应该很快会获取到锁,并且要避免执行协程的线程阻塞,因此这里选取的是自旋锁。
对于可自动释放的锁来说,其实现就比不自动释放的增加了在await_resume中返回自动释放的class,其他没啥区别。下面我们来依次介绍锁的获取与释放。
读锁
1 | inline SharedMutexFair::LockOperation<SharedMutexFair::LockSharedAwaiter> |
当co_await co_lock_shared()时,获取的awaiter是LockSharedAwaiter,其定义如下:
1 | class LockSharedAwaiter : public LockAwaiterBase { |
首先尝试获取读锁,如果获取成功,则继续执行协程。如果失败,则执行await_suspend。在await_suspend中,再次执行一次可以直接上锁的判断,如果可以上锁,则不suspend协程。否则,记录当前协程的continuation_,将当前协程加入到等待列表中。其中如下两句语句比较绕:
1 | *lock->waitersTailNext_ = this; |
第一句是把队尾的指针赋值为当前awaiter,关键是第二句,这里waitersTailNext_是一个双重指针,即LockAwaiterBase**这里将waitersTailNext_指向了当前awaiter的nextAwaiter_结构,则下次再向列表中添加元素时,执行的还是这两个语句,这时,第一条语句*lock->waitersTailNext_ = this;,就是将这一次的nextAwaiter_赋值为指向添加的awaiter。这样,每次对*lock->waitersTailNext_赋值,都是在对链表最后一个awaiter的nextAwaiter_赋值,以此达到串连所有awaiter的目的(妙啊)。这里还有个问题是起始指针,即State的waitersHead_变量,这就要再来看一下State的初始化了:
1 | State() noexcept |
可以看到,waitersTailNext_初始化执行waitersHead_,则第一次执行*lock->waitersTailNext_就是对waitersHead_赋值(好家伙,指针是被他玩明白了)。
至此,完成了等待协程awaiter的串连。
将等待读锁添加到等待链表后,当写锁释放时会遍历链表,对等待的协程加读锁。
释放读锁
释放读锁逻辑如下
1 | void SharedMutexFair::unlock_shared() noexcept { |
这里的核心逻辑是unlockOrGetNextWaitersToResume函数,其作用是获取可以获得锁的列表,其逻辑如下:
1 | SharedMutexFair::LockAwaiterBase* |
可以看到,其逻辑是按照等等的头部属性来拉取满足条件的awaiter,同时加锁。
resumeWaiters逻辑较为简单:
1 | void SharedMutexFair::resumeWaiters(LockAwaiterBase* awaiters) noexcept { |
遍历获取的awaiters,resume即可。但这里有个问题,如果resume协程后,协程串行执行,将会导致效率低下,即使协程本身绑定了executor,也不能保证被挂起后执行依然是异步的,这时就需要使用co_viaIfAsync方法,即在调用co_await时,对awaiter增加一层co_viaIfAsync封装,这就保证协程始终时异步协程(如果executor不为空),并且是被执行在指定的线程池上。这也是为什么返回的awaiter都由LockOperation包一层,因为其定义了viaIfAsync方法。对于task来说,这些是不必要的,但是如果是自己定义的协程promise_type就需要注意,执行锁获取应该使用
1 | const folly::Executor::KeepAlive<> executor = co_await co_current_executor; |
避免被唤醒的协程被串行执行。
自动释放的读锁
自动释放的读锁不需要用户显示调用unlock_shared(),在返回值的生命周期结束会自动释放,接口是:
1 | [[nodiscard]] LockOperation<ScopedLockSharedAwaiter> |
其中实现如下:
1 | class ScopedLockSharedAwaiter : public LockSharedAwaiter { |
可以看到,其相对LockSharedAwaiter唯一区别是其增加了返回值,该返回值将mutex_包起来,在析构时,调用释放锁的函数:
1 | ~SharedLock() { |
其他与正常读锁没什么区别。
写锁
介绍完了读锁,写锁就简单很多了。获取锁接口有两个,一个会自动释放,一个不会,这里只简单介绍不自动释放的。
获取写锁:
1 | inline SharedMutexFair::LockOperation<SharedMutexFair::LockAwaiter> |
释放写锁:
1 | void SharedMutexFair::unlock() noexcept { |
这里直接获取可以添加的队列而没有标记lockedFlagAndReaderCount_为kUnlocked是因为unlockOrGetNextWaitersToResume实现时是直接对unlockOrGetNextWaitersToResume赋值的,而不是再远基础上加减,因此没有必要执行这一步。